#!/usr/bin/python3 -cimport os, sys; os.execv(os.path.dirname(sys.argv[1]) + "/../common/pywrap", sys.argv)

# This file is part of Cockpit.
#
# Copyright (C) 2013 Red Hat, Inc.
#
# Cockpit is free software; you can redistribute it and/or modify it
# under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation; either version 2.1 of the License, or
# (at your option) any later version.
#
# Cockpit is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with Cockpit; If not, see <https://www.gnu.org/licenses/>.

import testlib


def line_sel(i):
    return '.terminal .xterm-accessibility-tree div:nth-child(%d)' % i


class TestTerminal(testlib.MachineCase):
    def setUp(self):
        super().setUp()

        # Make sure we get what we expect
        self.write_file("/home/admin/.bashrc", r"""
PS1="\u@local \W]\$ "
PROMPT_COMMAND='printf "\033]0;%s@%s:%s\007" "${USER}" "${HOSTNAME%%.*}" "${PWD/#$HOME/\~}"'
""", append=True)

        # Remove failed units which will show up in the first terminal line
        self.machine.execute("systemctl reset-failed")

    def wait_line(self, lineno, text):
        b = self.browser
        try:
            b.wait_text(line_sel(lineno), text)
        except Exception as e:
            print("-----")
            for j in range(max(1, lineno - 5), lineno + 5):
                print(b.text(line_sel(j)))
            print("-----")
            raise e

    def clear_terminal(self):
        b = self.browser
        b.input_text("clear")
        b.wait_js_cond("ph_text('.terminal').indexOf('clear') >= 0")
        # now wait for clear to take effect
        b.key("Enter")
        b.wait_js_cond("ph_text('.xterm-accessibility-tree').indexOf('clear') < 0")

    @testlib.nondestructive
    def testBasic(self):
        b = self.browser
        m = self.machine

        self.login_and_go("/system/terminal")
        blank_state = ' '

        # wait until first line is not empty
        n = 1
        b.wait_visible(".terminal .xterm-accessibility-tree")
        function_str = "(function (sel) { return ph_text(sel).trim() != '%s'})" % blank_state
        b.wait_js_func(function_str, line_sel(n))

        self.clear_terminal()
        # now we should get a clean prompt
        b.wait_in_text(line_sel(n), '$')

        # cut trailing non-breaking spaces
        prompt = b.text(line_sel(n))

        # Make sure we are started in home directory
        # Account for non-standard prompting
        if "]" not in prompt:
            self.assertIn(":~$", prompt)
        else:
            self.assertIn("~]$", prompt)

        # Run some commands
        b.input_text("whoami\n")
        self.wait_line(n + 1, "admin")

        self.wait_line(n + 2, prompt)

        b.input_text('echo -e "1\\u0041"\n')
        self.wait_line(n + 3, '1A')
        self.wait_line(n + 4, prompt)

        # non-UTF8 data
        m.execute(r"echo -e 'hello\xFF\x01\xFF\x02world' > " + self.vm_tmpdir + "/garbage.txt")
        b.input_text(f'cat {self.vm_tmpdir}/garbage.txt\n')
        self.wait_line(n + 5, 'helloworld')
        self.wait_line(n + 6, prompt)

        # The '@' sign is in the default prompt
        b.wait_in_text(".terminal-title", '@')

        # output flooding
        b.input_text("seq 1000000\n")
        with b.wait_timeout(300):
            b.wait_in_text(".terminal .xterm-accessibility-tree", "9999989999991000000admin@")

        b.assert_pixels("#terminal .terminal-group", "header", ignore=[".terminal-title"])
        b.assert_pixels("#terminal .xterm-text-layer", "text")

        # now reset terminal
        b.click('button:contains("Reset")')

        # assert that the output from earlier is gone
        self.wait_line(n + 1, blank_state)
        self.assertNotIn('admin', b.text(line_sel(n + 1)))

        # Check that when we `exit` we can still reconnect with the 'Reset' button
        b.input_text("exit\n")
        b.wait_in_text(".terminal .xterm-accessibility-tree", "disconnected")
        b.click('button:contains("Reset")')
        self.wait_line(n, prompt)
        b.wait_not_in_text(".terminal .xterm-accessibility-tree", "disconnected")

        def select_line(sel, width):
            # Select line by dragging mouse
            # Height on a line is around 14px, so start approx. in the middle of line
            b.mouse(sel, "mousedown", 0, 7)
            b.mouse(sel, "mousemove", width, 7)
            b.mouse(sel, "mouseup", width, 7)

        # Firefox does not support setting of permissions
        # and therefore we cannot test copy/paste with context menu
        if b.browser != "firefox":
            b.grant_permissions("clipboard-read", "clipboard-write")

            # Execute command
            self.wait_line(n, prompt)
            b.input_text('echo "XYZ"\n')
            echo_result_line = n + 1

            self.wait_line(echo_result_line, "XYZ")

            sel = line_sel(echo_result_line)

            # Highlight 40px (3 letters, never wider that ~14px)
            select_line(sel, 40)

            # Right click to context menu and pick copy
            b.mouse(sel, "click", btn=2)
            # Skipping mobile and medium layouts as these are the same with the default desktop layout
            b.assert_pixels("#terminal .contextMenu", "context-menu", skip_layouts=["mobile", "medium"])
            b.click('.contextMenu li:first-child button')

            # Right click and pick paste
            b.mouse(sel, "click", btn=2)
            b.click('.contextMenu li:nth-child(2) button')

            # Wait for text to show up
            self.wait_line(echo_result_line + 1, prompt + "XYZ")
            b.key('Enter')
            b.wait_in_text(line_sel(echo_result_line + 2), 'XYZ')

        # now reset terminal
        b.click('button:contains("Reset")')

        # assert that the output from earlier is gone
        self.wait_line(n + 1, blank_state)

        # Execute another command
        self.wait_line(n, prompt)
        b.input_text('echo "foo"\n')
        echo_result_line = n + 1

        self.wait_line(echo_result_line, "foo")

        sel = line_sel(echo_result_line)
        # Highlight 40px (3 letters, never wider that ~14px)
        select_line(sel, 40)

        # Use keyboard shortcuts to copy text
        b.key("Insert", modifiers=["Control"])
        b.key("Insert", modifiers=["Shift"])

        # Wait for text to show up
        self.wait_line(echo_result_line + 1, prompt + "foo")
        b.key("Backspace", repeat=3)

        # Key-who-must-not-be-named, non-breakable space does weird things,
        # see https://github.com/cockpit-project/cockpit/issues/21213
        # this must not crash the session, should be ignored
        b.input_text("echo space")
        b.key("Space", modifiers=["Control"])
        b.key("Enter")
        self.wait_line(echo_result_line + 2, "space")

        # check that we get a sensible $PATH; this varies across OSes, so don't be too strict about it
        b.input_text('clear\n')
        b.input_text("echo $PATH > /tmp/path\n")
        # don't use wait_line() for the full match here, as line breaks get in the way; just wait until command has run
        self.wait_line(echo_result_line, prompt)
        path = m.execute("cat /tmp/path").strip()
        if m.ws_container:
            self.assertIn("/usr/local/bin:/usr/bin", path)
        elif m.image == "arch":
            self.assertIn("/usr/local/sbin:/usr/local/bin:/usr/bin", path)
        else:
            self.assertIn("/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin", path)
        self.assertNotIn(":/.local", path)  # would happen with empty $HOME

        def check_theme_select(name, style):
            b.select_from_dropdown("#theme-select + div select", name)
            b.wait_attr_contains(".xterm-viewport", "style", style)

        # Check that color theme changes
        white_style = "background-color: rgb(255, 255, 255);"

        check_theme_select("black-theme", "background-color: rgb(0, 0, 0);")
        check_theme_select("dark-theme", "background-color: rgb(0, 43, 54);")
        check_theme_select("light-theme", "background-color: rgb(253, 246, 227);")
        check_theme_select("white-theme", white_style)

        # Test changing of font size
        b.wait_val("#toolbar input", 16)
        current_style = b.attr(".xterm-accessibility-tree div:first-child", "style")
        b.click("#toolbar button[aria-label='Increase by one']")
        new_style = b.attr(".xterm-accessibility-tree div:first-child", "style")
        self.assertNotEqual(current_style, new_style)

        # Unfocus the terminal before switching away to stop the
        # cursor from blinking, which hopefully avoids internal
        # Firefox errors related to canvs rendering in invisible
        # iframes.

        b.focus("#toolbar input")

        # Relogin and white-style and bigger font should be remembered
        # Relogin on different page, as reloging on terminal prompts asking if you want to leave the site

        b.go("/system")
        b.relogin("/system")
        b.go("/system/terminal")
        b.enter_page("/system/terminal")
        b.wait_visible('.terminal')
        b.wait_attr_contains(".xterm-viewport", "style", white_style)
        b.wait_val("#toolbar input", 17)
        b.wait_attr(".xterm-accessibility-tree div:first-child", "style", new_style)
        b.click("#toolbar button[aria-label='Decrease by one']")
        b.wait_val("#toolbar input", 16)
        b.wait_attr(".xterm-accessibility-tree div:first-child", "style", current_style)

        # Check limit for font size
        for i in range(15, 5, -1):
            b.click("#toolbar button[aria-label='Decrease by one']")
            b.wait_val("#toolbar input", i)
        b.wait_visible("#toolbar button[aria-label='Decrease by one']:disabled")

    @testlib.nondestructive
    def testOnlyTerminal(self):
        b = self.browser
        m = self.machine

        m.write("/etc/cockpit/cockpit.conf", "[WebService]\nShell = /system/terminal.html\n")
        m.start_cockpit()
        b.open("/")

        b.try_login("admin", "foobar")
        b.wait_visible("#terminal")

        # wait until first line is not empty
        n = 1
        b.wait_visible(".terminal .xterm-accessibility-tree")
        b.wait_js_func("(function (sel) { return ph_text(sel).trim() != ' '})", line_sel(n))

        # execute a command we can see the effect of
        b.wait_js_cond("ph_text('.terminal').indexOf('uid=') == -1")
        b.input_text("id\n")
        b.wait_js_cond("ph_text('.terminal').indexOf('uid=') >= 0")

    @testlib.nondestructive
    def testOpenPath(self):
        b = self.browser
        m = self.machine

        self.login_and_go("/system/terminal")

        blank_state = ' '

        # Wait until first line is not empty
        b.wait_visible(".terminal .xterm-accessibility-tree")
        function_str = "(function (sel) { return ph_text(sel).trim() != '%s'})" % blank_state
        b.wait_js_func(function_str, line_sel(1))

        self.clear_terminal()

        # Change to directory that exists
        b.go("#/?path=/tmp")
        b.wait_visible(".terminal .xterm-accessibility-tree")
        self.wait_line(1, "disconnected")
        self.clear_terminal()
        b.input_text("pwd\n")
        self.wait_line(2, "/tmp")
        b.wait_not_present(".pf-v6-c-alert")

        # Error on dir non-exist
        b.go("#/?path=/doesnotexist")
        b.wait_in_text(".pf-v6-c-alert", "does not exist")
        # If there's an error, we shouldn't reset the terminal
        b.input_text("pwd\n")
        self.wait_line(4, "/tmp")
        # Clear the error
        b.click('.pf-v6-c-alert__action button')
        b.wait_not_present(".pf-v6-c-alert")

        # Error on change to non-dir
        b.go("#/?path=/etc%2Fos-release")
        b.wait_in_text(".pf-v6-c-alert", "not a directory")
        # If there's an error, we shouldn't reset the terminal
        b.input_text("pwd\n")
        self.wait_line(6, "/tmp")
        # Clear the error
        b.click('div.pf-v6-c-alert__action button')
        b.wait_not_present(".pf-v6-c-alert")
        b.wait_js_cond('window.location.hash == "#/"')

        # Prompt for change if terminal busy
        b.input_text("sh -c 'echo busybusybusy; echo $$; exec sleep infinity'\n")
        b.wait_in_text(line_sel(8), 'busybusybusy')
        pid = b.text(line_sel(9))
        b.go("#/?path=/tmp")
        b.wait_in_text(".pf-v6-c-alert.pf-m-danger", "Running process prevents directory change")
        # Cancel change
        b.click("button:contains('Cancel')")
        b.wait_not_present(".pf-v6-c-alert")
        b.wait_js_cond('window.location.hash == "#/"')
        # Check busy process is still running
        m.execute(f"kill -0 {pid}")
        b.wait_in_text(line_sel(8), 'busybusybusy')
        # Confirm change
        b.go("#/?path=/var")
        b.wait_in_text(".pf-v6-c-alert.pf-m-danger", "Running process prevents directory change")
        b.click("button:contains('Change directory')")
        b.wait_not_present(".pf-v6-c-alert.pf-m-danger")
        self.wait_line(1, "disconnected")
        self.clear_terminal()
        b.input_text("pwd\n")
        self.wait_line(2, "/var")
        self.assertIn("No such process", m.execute(f"kill -0 {pid} 2>&1 || true"))

    def testExternalState(self):
        b = self.browser
        m = self.machine

        self.login_and_go("/playground/terminal")

        b.wait_text('.pf-v6-c-toggle-group__button.pf-m-selected', "Terminal 1")

        # Run something in Terminal 1
        b.wait_in_text(line_sel(1), "admin@local")
        b.input_text("echo Terminal 1\n")
        b.wait_text(line_sel(2), "Terminal 1")

        # Switch to Terminal 2
        b.click('.pf-v6-c-toggle-group__button:contains(Terminal 2)')
        b.wait_text('.pf-v6-c-toggle-group__button.pf-m-selected', "Terminal 2")
        b.wait_not_in_text(".terminal .xterm-accessibility-tree", "Terminal 1")

        # Run something else there
        b.wait_in_text(line_sel(1), "admin@local")
        b.input_text("echo Terminal 2\n")
        b.wait_text(line_sel(2), "Terminal 2")

        # Switch back to Terminal 1, old output should still be there
        b.click('.pf-v6-c-toggle-group__button:contains(Terminal 1)')
        b.wait_text('.pf-v6-c-toggle-group__button.pf-m-selected', "Terminal 1")
        b.wait_text(line_sel(2), "Terminal 1")

        # Reset, this will output "disconnected" and start a new shell
        # process

        b.input_text("echo $$\n")
        b.wait_in_text(line_sel(5), "admin@local")
        old_pid = b.text(line_sel(4))

        b.click('button:contains(Reset)')
        b.click(".terminal")  # Focus the terminal again
        b.wait_in_text(line_sel(1), "disconnected")
        b.wait_in_text(line_sel(2), "admin@local")
        b.input_text("echo $$\n")
        b.wait_in_text(line_sel(4), "admin@local")
        new_pid = b.text(line_sel(3))

        # Converting the pid string to int will throw an error if we
        # have picked up anything other than a number.

        self.assertNotEqual(int(old_pid), int(new_pid))

        # Now we should have four bash processes, one for each tab

        def count_admin_bashes():
            return int(m.execute("loginctl user-status admin | grep /bin/bash | wc -l"))

        testlib.wait(lambda: count_admin_bashes() == 4)

        # Remove all terminals, this closes all channels and kills all bash processes

        b.click("label:contains(Show terminals)")
        b.wait_not_present(".terminal")

        testlib.wait(lambda: count_admin_bashes() == 0)


if __name__ == '__main__':
    testlib.test_main()
